{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 第六章：卷积神经网络\n",
    "湖北理工学院《机器学习》课程资料\n",
    "\n",
    "作者：李辉楚吴\n",
    "\n",
    "笔记内容概述: 卷积神经网络、手写字母识别"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import torch\n",
    "import torchvision\n",
    "import torchvision.transforms as transforms\n",
    "import matplotlib.pyplot as plt\n",
    "\n",
    "label_size = 18 # Label size\n",
    "ticklabel_size = 14 # Tick label size\n",
    "    \n",
    "# Define a transform to normalize the data\n",
    "transform = transforms.Compose([\n",
    "    transforms.ToTensor()\n",
    "])\n",
    "\n",
    "# Load test data from the MNIST\n",
    "testset = torchvision.datasets.MNIST(root='./Data', train=False, download=False, transform=transform)\n",
    "print(f\"Test set size: {len(testset)}\")\n",
    "\n",
    "# Load training data from the MNIST\n",
    "trainset = torchvision.datasets.MNIST(root='./Data', train=True, download=False, transform=transform)\n",
    "print(f\"Training set size: {len(trainset)}\")\n",
    "\n",
    "# Rate of trX and cvX\n",
    "tr_cv_rate = 0.8\n",
    "\n",
    "# Create a list to store indices for each class unique()\n",
    "class_indices = [[] for _ in range(10)]  # 10 classes in MNIST\n",
    "\n",
    "# Populate class_indices\n",
    "for idx, (_, label) in enumerate(trainset):\n",
    "    class_indices[label].append(idx)\n",
    "\n",
    "# Calculate the number of samples for each class in training and validation sets\n",
    "train_size_per_class = int(tr_cv_rate * min(len(indices) for indices in class_indices))\n",
    "val_size_per_class = min(len(indices) for indices in class_indices) - train_size_per_class\n",
    "\n",
    "# Create balanced train and validation sets\n",
    "train_indices = []\n",
    "val_indices = []\n",
    "for indices in class_indices:\n",
    "    train_indices.extend(indices[:train_size_per_class])\n",
    "    val_indices.extend(indices[train_size_per_class:train_size_per_class + val_size_per_class])\n",
    "\n",
    "# Create Subset datasets\n",
    "from torch.utils.data import Subset\n",
    "trX = Subset(trainset, train_indices)\n",
    "cvX = Subset(trainset, val_indices)\n",
    "\n",
    "print(f\"Number of training samples: {len(trX)}\")\n",
    "print(f\"Number of cross-validation samples: {len(cvX)}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "构建DataLoaders，准备训练模型"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "batch_size = 42 # Define training batch 1，\n",
    "\n",
    "def one_hot_collate(batch):\n",
    "    data = torch.stack([item[0] for item in batch])\n",
    "    labels = torch.tensor([item[1] for item in batch])\n",
    "    one_hot_labels = torch.zeros(labels.size(0), 10)  # 10 classes in MNIST 【0，1，0，0】\n",
    "    one_hot_labels.scatter_(1, labels.unsqueeze(1), 1)\n",
    "    return data, one_hot_labels\n",
    "\n",
    "trLoader = torch.utils.data.DataLoader(trX, batch_size=batch_size, shuffle=True, num_workers=0, collate_fn=one_hot_collate)\n",
    "cvLoader = torch.utils.data.DataLoader(cvX, batch_size=batch_size, shuffle=False, num_workers=0, collate_fn=one_hot_collate)\n",
    "teLoader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=0, collate_fn=one_hot_collate)\n",
    "\n",
    "# Get a batch of training data\n",
    "dataiter = iter(trLoader)\n",
    "data, labels = next(dataiter)\n",
    "\n",
    "image_channels = data[0].numpy().shape[0]\n",
    "print(f'image_channels is {image_channels}')\n",
    "print(labels)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 定义并训练卷积神经网络\n",
    "* 输入：2-D图片\n",
    "* 输出：手写字母类型的概率分布\n",
    "* 卷积层：2层, 卷积核大小：3x3\n",
    "* 隐藏层1：100个节点\n",
    "* 隐藏层2：50个节点"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import torch.nn as nn\n",
    "import torch.nn.functional as F\n",
    "import torch.optim as optim\n",
    "\n",
    "class CNN(nn.Module):\n",
    "    def __init__(self, image_channels, num_classes):\n",
    "        super(CNN, self).__init__()\n",
    "        \n",
    "        # First convolutional layer\n",
    "        self.conv1 = nn.Conv2d(in_channels=image_channels, out_channels=32, kernel_size=3, padding=1)\n",
    "        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)\n",
    "        \n",
    "        # Second convolutional layer\n",
    "        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)\n",
    "        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)\n",
    "        \n",
    "        # Fully connected layers\n",
    "        self.fc1 = nn.Linear(64 * 7 * 7, 100)  # After two 2x2 max pools, 28x28 -> 7x7\n",
    "        self.fc2 = nn.Linear(100, num_classes)  # 10 classes output\n",
    "\n",
    "        # Softmax\n",
    "        self.softmax = nn.Softmax(dim=1)\n",
    "        \n",
    "    def forward(self, x):\n",
    "        # Remove the reshape operation and directly use x\n",
    "        x = F.relu(self.conv1(x))\n",
    "        x = self.pool1(x)\n",
    "        \n",
    "        # Second conv layer\n",
    "        x = F.relu(self.conv2(x))\n",
    "        x = self.pool2(x)\n",
    "        \n",
    "        # Flatten the output for the fully connected layers\n",
    "        x = x.view(-1, 64 * 7 * 7)\n",
    "        \n",
    "        # Fully connected layers\n",
    "        x = F.relu(self.fc1(x))\n",
    "        x = self.fc2(x)\n",
    "        \n",
    "        # Softmax\n",
    "        x = self.softmax(x)\n",
    "        \n",
    "        return x\n",
    "\n",
    "# Initialize the model\n",
    "model = CNN(image_channels, 10)\n",
    "if torch.cuda.is_available():\n",
    "    model = model.cuda()\n",
    "\n",
    "# Display model architecture\n",
    "print(model)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "使用Adam作为Optimizor训练模型"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Define loss function and optimizer\n",
    "criterion = nn.CrossEntropyLoss() # Loss\n",
    "optimizer = torch.optim.Adam(model.parameters()) # Adam\n",
    "\n",
    "# Lists to store losses\n",
    "train_losses = []\n",
    "cv_losses = []\n",
    "\n",
    "# Number of epochs\n",
    "num_epochs = 50\n",
    "\n",
    "for epoch in range(num_epochs):\n",
    "    model.train()\n",
    "    batch_losses = []\n",
    "    \n",
    "    for batch_x, batch_y in trLoader:\n",
    "        # Forward pass\n",
    "        outputs = model(batch_x)\n",
    "        loss = criterion(outputs, batch_y)\n",
    "        \n",
    "        # Backward pass and optimize\n",
    "        optimizer.zero_grad()\n",
    "        loss.backward()\n",
    "        optimizer.step()\n",
    "        \n",
    "        batch_losses.append(loss.item())\n",
    "    \n",
    "    # Calculate average training loss for this epoch\n",
    "    avg_train_loss = sum(batch_losses) / len(batch_losses)\n",
    "    train_losses.append(avg_train_loss)\n",
    "    \n",
    "    # Evaluate on cross-validation set\n",
    "    model.eval()\n",
    "    cv_batch_losses = []\n",
    "    with torch.no_grad():\n",
    "        for cv_x, cv_y in cvLoader:\n",
    "            cv_outputs = model(cv_x)\n",
    "            cv_loss = criterion(cv_outputs, cv_y)\n",
    "            cv_batch_losses.append(cv_loss.item())\n",
    "    \n",
    "    avg_cv_loss = sum(cv_batch_losses) / len(cv_batch_losses)\n",
    "    cv_losses.append(avg_cv_loss)\n",
    "    \n",
    "    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, CV Loss: {avg_cv_loss:.4f}')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "计算识别精度，展示学习曲线"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Calculate and print accuracies for training and cross-validation sets\n",
    "model.eval()\n",
    "with torch.no_grad():\n",
    "    # Training set accuracy\n",
    "    tr_correct = 0\n",
    "    tr_total = 0\n",
    "    for images, labels in trLoader:\n",
    "        outputs = model(images)\n",
    "        _, predicted = torch.max(outputs, 1)\n",
    "        _, true_labels = torch.max(labels, 1)\n",
    "        tr_total += labels.size(0)\n",
    "        tr_correct += (predicted == true_labels).sum().item()\n",
    "    \n",
    "    tr_accuracy = 100 * tr_correct / tr_total\n",
    "    \n",
    "    # Test set accuracy\n",
    "    cv_correct = 0\n",
    "    cv_total = 0\n",
    "    for images, labels in cvLoader:\n",
    "        outputs = model(images)\n",
    "        _, predicted = torch.max(outputs, 1)\n",
    "        _, true_labels = torch.max(labels, 1)\n",
    "        cv_total += labels.size(0)\n",
    "        cv_correct += (predicted == true_labels).sum().item()\n",
    "    \n",
    "    cv_accuracy = 100 * cv_correct / cv_total\n",
    "\n",
    "print(f'Accuracy on training set: {tr_accuracy:.2f}%')\n",
    "print(f'Accuracy on cross-validation set: {cv_accuracy:.2f}%')\n",
    "\n",
    "# Plot training and cross-validation losses\n",
    "plt.figure(figsize=(10, 5))\n",
    "plt.plot(range(1, num_epochs+1), train_losses, label='Training Loss')\n",
    "plt.plot(range(1, num_epochs+1), cv_losses, label='Cross-Validation Loss')\n",
    "plt.xlabel('Epoch')\n",
    "plt.ylabel('Loss')\n",
    "plt.title('Training and Cross-Validation Loss')\n",
    "plt.legend()\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 实验九"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "使用训练数据生成DataSet和DataLoader对象用于模型训练\n",
    "使用torch.nn定义CNN类，可自定隐藏层数和卷积核数量\n",
    "用Adam的优化方式训练模型\n",
    "在验证或测试集上测试模型性能，画出学习曲线并分析结果"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 2. 定义卷积神经网络（CNN）\n",
    "class CNN(nn.Module):\n",
    "    def __init__(self, num_classes=10):\n",
    "        super(CNN, self).__init__()\n",
    "        \n",
    "        # 卷积层1：输入3通道图像，输出16个通道，卷积核大小3x3\n",
    "        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, padding=1)\n",
    "        # 卷积层2：输入16通道图像，输出32个通道，卷积核大小3x3\n",
    "        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1)\n",
    "        # 卷积层3：输入32通道图像，输出64个通道，卷积核大小3x3\n",
    "        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)\n",
    "        \n",
    "        # 全连接层\n",
    "        self.fc1 = nn.Linear(64 * 8 * 8, 512)  # 输入大小为64x8x8，输出512个节点\n",
    "        self.fc2 = nn.Linear(512, num_classes)  # 输出层，10个类别\n",
    "\n",
    "    def forward(self, x):\n",
    "        # 卷积层1 + 激活函数 + 池化层\n",
    "        x = F.relu(self.conv1(x))\n",
    "        x = F.max_pool2d(x, 2, 2)  # 2x2池化\n",
    "\n",
    "        # 卷积层2 + 激活函数 + 池化层\n",
    "        x = F.relu(self.conv2(x))\n",
    "        x = F.max_pool2d(x, 2, 2)  # 2x2池化\n",
    "\n",
    "        # 卷积层3 + 激活函数 + 池化层\n",
    "        x = F.relu(self.conv3(x))\n",
    "        x = F.max_pool2d(x, 2, 2)  # 2x2池化\n",
    "\n",
    "        # 展平层\n",
    "        x = x.view(-1, 64 * 8 * 8)  # 展平为一维向量\n",
    "\n",
    "        # 全连接层\n",
    "        x = F.relu(self.fc1(x))\n",
    "        x = self.fc2(x)  # 输出层\n",
    "        return x\n",
    "\n",
    "# 3. 训练函数\n",
    "def train_model(model, trainloader, testloader, num_epochs=10, lr=0.001):\n",
    "    criterion = nn.CrossEntropyLoss()\n",
    "    optimizer = optim.Adam(model.parameters(), lr=lr)\n",
    "\n",
    "    train_loss_history = []\n",
    "    train_accuracy_history = []\n",
    "    \n",
    "    for epoch in range(num_epochs):\n",
    "        model.train()  # 设置为训练模式\n",
    "        running_loss = 0.0\n",
    "        correct = 0\n",
    "        total = 0\n",
    "        for inputs, labels in trainloader:\n",
    "            optimizer.zero_grad()\n",
    "\n",
    "            # Forward pass\n",
    "            outputs = model(inputs)\n",
    "            loss = criterion(outputs, labels)\n",
    "\n",
    "            # Backward pass\n",
    "            loss.backward()\n",
    "            optimizer.step()\n",
    "\n",
    "            running_loss += loss.item()\n",
    "\n",
    "            _, predicted = torch.max(outputs.data, 1)\n",
    "            total += labels.size(0)\n",
    "            correct += (predicted == labels).sum().item()\n",
    "\n",
    "        epoch_loss = running_loss / len(trainloader)\n",
    "        epoch_accuracy = 100 * correct / total\n",
    "        train_loss_history.append(epoch_loss)\n",
    "        train_accuracy_history.append(epoch_accuracy)\n",
    "\n",
    "        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%')\n",
    "\n",
    "    # 绘制训练过程中的损失和准确率曲线\n",
    "    plt.figure(figsize=(12, 6))\n",
    "\n",
    "    # 绘制损失曲线\n",
    "    plt.subplot(1, 2, 1)\n",
    "    plt.plot(train_loss_history, label=\"Training Loss\")\n",
    "    plt.title(\"Training Loss\")\n",
    "    plt.xlabel(\"Epoch\")\n",
    "    plt.ylabel(\"Loss\")\n",
    "\n",
    "    # 绘制准确率曲线\n",
    "    plt.subplot(1, 2, 2)\n",
    "    plt.plot(train_accuracy_history, label=\"Training Accuracy\")\n",
    "    plt.title(\"Training Accuracy\")\n",
    "    plt.xlabel(\"Epoch\")\n",
    "    plt.ylabel(\"Accuracy (%)\")\n",
    "\n",
    "    plt.tight_layout()\n",
    "    plt.show()\n",
    "\n",
    "    return model\n",
    "\n",
    "# 初始化模型\n",
    "model_cnn = CNN(num_classes=10)\n",
    "\n",
    "# 训练模型（CIFAR-10任务）\n",
    "model_cnn = train_model(model_cnn, trainloader, testloader, num_epochs=10)\n",
    "\n",
    "# 4. 在测试集上评估模型性能\n",
    "def test_model(model, testloader):\n",
    "    model.eval()  # 设置为评估模式\n",
    "    correct = 0\n",
    "    total = 0\n",
    "    with torch.no_grad():\n",
    "        for inputs, labels in testloader:\n",
    "            outputs = model(inputs)\n",
    "            _, predicted = torch.max(outputs.data, 1)\n",
    "            total += labels.size(0)\n",
    "            correct += (predicted == labels).sum().item()\n",
    "\n",
    "    accuracy = 100 * correct / total\n",
    "    print(f'Test Accuracy: {accuracy:.2f}%')\n",
    "\n",
    "# 测试模型（CIFAR-10任务）\n",
    "test_model(model_cnn, testloader)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "machinelearning",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
